"""Test for Template helper."""

from datetime import timedelta
from unittest.mock import patch

import pytest

from homeassistant import config
from homeassistant.components import labs
from homeassistant.components.template import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import SERVICE_RELOAD
from homeassistant.core import Context, HomeAssistant
from homeassistant.helpers import (
    device_registry as dr,
    entity_registry as er,
    issue_registry as ir,
)
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util

from tests.common import (
    MockConfigEntry,
    MockUser,
    async_capture_events,
    async_fire_time_changed,
    get_fixture_path,
)
from tests.typing import WebSocketGenerator


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            },
            "template": [
                {
                    "trigger": {"platform": "event", "event_type": "event_1"},
                    "sensor": {
                        "name": "top level",
                        "state": "{{ trigger.event.data.source }}",
                    },
                },
                {
                    "sensor": {
                        "name": "top level state",
                        "state": "{{ states.sensor.top_level.state }} + 2",
                    },
                    "binary_sensor": {
                        "name": "top level state",
                        "state": "{{ states.sensor.top_level.state == 'init' }}",
                    },
                },
            ],
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reloadable(hass: HomeAssistant) -> None:
    """Test that we can reload."""
    hass.states.async_set("sensor.test_sensor", "mytest")
    await hass.async_block_till_done()
    assert hass.states.get("sensor.top_level_state").state == "unknown + 2"
    assert hass.states.get("binary_sensor.top_level_state").state == "off"

    hass.bus.async_fire("event_1", {"source": "init"})
    await hass.async_block_till_done()
    assert len(hass.states.async_all()) == 5
    assert hass.states.get("sensor.state").state == "mytest"
    assert hass.states.get("sensor.top_level").state == "init"
    await hass.async_block_till_done()
    assert hass.states.get("sensor.top_level_state").state == "init + 2"
    assert hass.states.get("binary_sensor.top_level_state").state == "on"

    await async_yaml_patch_helper(hass, "sensor_configuration.yaml")
    assert len(hass.states.async_all()) == 4

    hass.bus.async_fire("event_2", {"source": "reload"})
    await hass.async_block_till_done()
    assert hass.states.get("sensor.state") is None
    assert hass.states.get("sensor.top_level") is None
    assert hass.states.get("sensor.watching_tv_in_master_bedroom").state == "off"
    assert float(hass.states.get("sensor.combined_sensor_energy_usage").state) == 0
    assert hass.states.get("sensor.top_level_2").state == "reload"


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            },
            "template": {
                "trigger": {"platform": "event", "event_type": "event_1"},
                "sensor": {
                    "name": "top level",
                    "state": "{{ trigger.event.data.source }}",
                },
            },
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reloadable_can_remove(hass: HomeAssistant) -> None:
    """Test that we can reload and remove all template sensors."""
    hass.states.async_set("sensor.test_sensor", "mytest")
    await hass.async_block_till_done()
    hass.bus.async_fire("event_1", {"source": "init"})
    await hass.async_block_till_done()
    assert len(hass.states.async_all()) == 3
    assert hass.states.get("sensor.state").state == "mytest"
    assert hass.states.get("sensor.top_level").state == "init"

    await async_yaml_patch_helper(hass, "empty_configuration.yaml")
    assert len(hass.states.async_all()) == 1


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            }
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reloadable_stops_on_invalid_config(hass: HomeAssistant) -> None:
    """Test we stop the reload if configuration.yaml is completely broken."""
    hass.states.async_set("sensor.test_sensor", "mytest")
    await hass.async_block_till_done()
    assert hass.states.get("sensor.state").state == "mytest"
    assert len(hass.states.async_all()) == 2

    await async_yaml_patch_helper(hass, "configuration.yaml.corrupt")
    assert hass.states.get("sensor.state").state == "mytest"
    assert len(hass.states.async_all()) == 2


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            }
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reloadable_handles_partial_valid_config(hass: HomeAssistant) -> None:
    """Test we can still setup valid sensors when configuration.yaml has a broken entry."""
    hass.states.async_set("sensor.test_sensor", "mytest")
    await hass.async_block_till_done()
    assert hass.states.get("sensor.state").state == "mytest"
    assert len(hass.states.async_all("sensor")) == 2

    await async_yaml_patch_helper(hass, "broken_configuration.yaml")
    assert len(hass.states.async_all("sensor")) == 3

    assert hass.states.get("sensor.state") is None
    assert hass.states.get("sensor.watching_tv_in_master_bedroom").state == "off"
    assert float(hass.states.get("sensor.combined_sensor_energy_usage").state) == 0


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            }
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reloadable_multiple_platforms(hass: HomeAssistant) -> None:
    """Test that we can reload."""
    hass.states.async_set("sensor.test_sensor", "mytest")
    await async_setup_component(
        hass,
        "binary_sensor",
        {
            "binary_sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            }
        },
    )
    await hass.async_block_till_done()
    assert hass.states.get("sensor.state").state == "mytest"
    assert hass.states.get("binary_sensor.state").state == "off"
    assert len(hass.states.async_all()) == 3

    await async_yaml_patch_helper(hass, "sensor_configuration.yaml")
    assert len(hass.states.async_all()) == 4
    assert hass.states.get("sensor.state") is None
    assert hass.states.get("sensor.watching_tv_in_master_bedroom").state == "off"
    assert float(hass.states.get("sensor.combined_sensor_energy_usage").state) == 0
    assert hass.states.get("sensor.top_level_2") is not None


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {"value_template": "{{ 1 }}"},
                },
            }
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reload_sensors_that_reference_other_template_sensors(
    hass: HomeAssistant,
) -> None:
    """Test that we can reload sensor that reference other template sensors."""
    await async_yaml_patch_helper(hass, "ref_configuration.yaml")
    assert len(hass.states.async_all()) == 3
    await hass.async_block_till_done()

    next_time = dt_util.utcnow() + timedelta(seconds=1.2)
    with patch(
        "homeassistant.helpers.ratelimit.time.time", return_value=next_time.timestamp()
    ):
        async_fire_time_changed(hass, next_time)
        await hass.async_block_till_done()
    assert hass.states.get("sensor.test1").state == "3"
    assert hass.states.get("sensor.test2").state == "1"
    assert hass.states.get("sensor.test3").state == "2"


@pytest.mark.parametrize(("count", "domain"), [(1, "sensor")])
@pytest.mark.parametrize(
    "config",
    [
        {
            "sensor": {
                "platform": DOMAIN,
                "sensors": {
                    "state": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                    "state2": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                    "state3": {
                        "value_template": "{{ states.sensor.test_sensor.state }}"
                    },
                },
            },
        },
    ],
)
@pytest.mark.usefixtures("start_ha")
async def test_reload_removes_legacy_deprecation(
    hass: HomeAssistant, issue_registry: ir.IssueRegistry
) -> None:
    """Test that we can reload and remove all template sensors."""
    hass.states.async_set("sensor.test_sensor", "old")
    await hass.async_block_till_done()
    assert len(hass.states.async_all()) == 4
    assert hass.states.get("sensor.state").state == "old"
    assert hass.states.get("sensor.state2").state == "old"
    assert hass.states.get("sensor.state3").state == "old"

    assert len(issue_registry.issues) == 3

    await async_yaml_patch_helper(hass, "legacy_template_deprecation.yaml")
    assert len(hass.states.async_all()) == 4
    assert hass.states.get("sensor.state").state == "old"
    assert hass.states.get("sensor.state2").state == "old"
    assert hass.states.get("sensor.state3").state == "old"
    assert len(issue_registry.issues) == 1


async def async_yaml_patch_helper(hass: HomeAssistant, filename: str) -> None:
    """Help update configuration.yaml."""
    yaml_path = get_fixture_path(filename, "template")
    with patch.object(config, "YAML_CONFIG_FILE", yaml_path):
        await hass.services.async_call(
            DOMAIN,
            SERVICE_RELOAD,
            {},
            blocking=True,
        )
        await hass.async_block_till_done()


@pytest.mark.parametrize(
    (
        "config_entry_options",
        "config_user_input",
    ),
    [
        (
            {
                "name": "My template",
                "state": "{{10}}",
                "template_type": "sensor",
            },
            {
                "state": "{{12}}",
            },
        ),
        (
            {
                "template_type": "binary_sensor",
                "name": "My template",
                "state": "{{1 == 1}}",
            },
            {
                "state": "{{1 == 2}}",
            },
        ),
        (
            {
                "template_type": "image",
                "name": "My template",
                "url": "http://example.com",
            },
            {
                "url": "http://example.com",
            },
        ),
        (
            {
                "template_type": "button",
                "name": "My template",
            },
            {},
        ),
        (
            {
                "template_type": "number",
                "name": "My template",
                "state": "{{ 10 }}",
                "min": 0,
                "max": 100,
                "step": 0.1,
                "set_value": {
                    "action": "input_number.set_value",
                    "target": {"entity_id": "input_number.test"},
                    "data": {"value": "{{ value }}"},
                },
            },
            {
                "state": "{{ 11 }}",
                "min": 0,
                "max": 100,
                "step": 0.1,
                "set_value": {
                    "action": "input_number.set_value",
                    "target": {"entity_id": "input_number.test"},
                    "data": {"value": "{{ value }}"},
                },
            },
        ),
        (
            {
                "template_type": "select",
                "name": "My template",
                "state": "{{ 'on' }}",
                "options": "{{ ['off', 'on', 'auto'] }}",
            },
            {
                "state": "{{ 'on' }}",
                "options": "{{ ['off', 'on', 'auto'] }}",
            },
        ),
        (
            {
                "template_type": "switch",
                "name": "My template",
                "value_template": "{{ true }}",
            },
            {
                "value_template": "{{ true }}",
            },
        ),
        (
            {
                "template_type": "event",
                "name": "My template",
                "event_type": "{{ 'single' }}",
                "event_types": "{{ ['single', 'double'] }}",
            },
            {
                "event_type": "{{ 'single' }}",
                "event_types": "{{ ['single', 'double'] }}",
            },
        ),
        (
            {
                "template_type": "update",
                "name": "My template",
                "latest_version": "{{ '1.0' }}",
                "installed_version": "{{ '1.0' }}",
            },
            {
                "latest_version": "{{ '1.0' }}",
                "installed_version": "{{ '1.0' }}",
            },
        ),
    ],
)
async def test_change_device(
    hass: HomeAssistant,
    device_registry: dr.DeviceRegistry,
    entity_registry: er.EntityRegistry,
    config_entry_options: dict[str, str],
    config_user_input: dict[str, str],
) -> None:
    """Test the link between the device and the config entry.

    Test, for each platform, that the device was linked to the
    config entry and the link was removed when the device is
    changed in the integration options.
    """

    def check_template_entities(
        template_entity_id: str,
        device_id: str | None = None,
    ) -> None:
        """Check that the template entity is linked to the correct device."""
        template_entity_ids: list[str] = []
        for template_entity in entity_registry.entities.get_entries_for_config_entry_id(
            template_config_entry.entry_id
        ):
            template_entity_ids.append(template_entity.entity_id)
            assert template_entity.device_id == device_id
        assert template_entity_ids == [template_entity_id]

    # Configure devices registry
    entry_device1 = MockConfigEntry()
    entry_device1.add_to_hass(hass)
    device1 = device_registry.async_get_or_create(
        config_entry_id=entry_device1.entry_id,
        identifiers={("test", "identifier_test1")},
        connections={("mac", "20:31:32:33:34:01")},
    )
    entry_device2 = MockConfigEntry()
    entry_device2.add_to_hass(hass)
    device2 = device_registry.async_get_or_create(
        config_entry_id=entry_device1.entry_id,
        identifiers={("test", "identifier_test2")},
        connections={("mac", "20:31:32:33:34:02")},
    )
    await hass.async_block_till_done()

    device_id1 = device1.id
    assert device_id1 is not None

    device_id2 = device2.id
    assert device_id2 is not None

    # Setup the config entry
    template_config_entry = MockConfigEntry(
        data={},
        domain=DOMAIN,
        options=config_entry_options | {"device_id": device_id1},
        title="Template",
    )
    template_config_entry.add_to_hass(hass)
    assert await hass.config_entries.async_setup(template_config_entry.entry_id)
    await hass.async_block_till_done()

    template_entity_id = f"{config_entry_options['template_type']}.my_template"

    # Confirm that the template config entry has not been added to either device
    # and that the entities are linked to device 1
    for device_id in (device_id1, device_id2):
        device = device_registry.async_get(device_id=device_id)
        assert template_config_entry.entry_id not in device.config_entries
    check_template_entities(template_entity_id, device_id1)

    # Change config options to use device 2 and reload the integration
    result = await hass.config_entries.options.async_init(
        template_config_entry.entry_id
    )
    result = await hass.config_entries.options.async_configure(
        result["flow_id"],
        user_input=config_user_input | {"device_id": device_id2},
    )
    await hass.async_block_till_done()

    # Confirm that the template config entry has not been added to either device
    # and that the entities are linked to device 2
    for device_id in (device_id1, device_id2):
        device = device_registry.async_get(device_id=device_id)
        assert template_config_entry.entry_id not in device.config_entries
    check_template_entities(template_entity_id, device_id2)

    # Change the config options to remove the device and reload the integration
    result = await hass.config_entries.options.async_init(
        template_config_entry.entry_id
    )
    result = await hass.config_entries.options.async_configure(
        result["flow_id"],
        user_input=config_user_input,
    )
    await hass.async_block_till_done()

    # Confirm that the template config entry has not been added to either device
    # and that the entities are not linked to any device
    for device_id in (device_id1, device_id2):
        device = device_registry.async_get(device_id=device_id)
        assert template_config_entry.entry_id not in device.config_entries
    check_template_entities(template_entity_id, None)

    # Confirm that there is no device with the helper config entry
    assert (
        dr.async_entries_for_config_entry(
            device_registry, template_config_entry.entry_id
        )
        == []
    )


async def test_fail_non_numerical_number_settings(
    hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
    """Test that non numerical number options causes config entry setup to fail.

    Support for non numerical max, min and step was added in HA Core 2024.9.0 and
    removed in HA Core 2024.9.1.
    """

    options = {
        "template_type": "number",
        "name": "My template",
        "state": "{{ 10 }}",
        "min": "{{ 0 }}",
        "max": "{{ 100 }}",
        "step": "{{ 0.1 }}",
        "set_value": {
            "action": "input_number.set_value",
            "target": {"entity_id": "input_number.test"},
            "data": {"value": "{{ value }}"},
        },
    }
    # Setup the config entry
    template_config_entry = MockConfigEntry(
        data={},
        domain=DOMAIN,
        options=options,
        title="Template",
    )
    template_config_entry.add_to_hass(hass)
    assert not await hass.config_entries.async_setup(template_config_entry.entry_id)
    assert (
        "The 'My template' number template needs to be reconfigured, "
        "max must be a number, got '{{ 100 }}'" in caplog.text
    )


async def test_yaml_reload_when_labs_flag_changes(
    hass: HomeAssistant,
    hass_ws_client: WebSocketGenerator,
    hass_admin_user: MockUser,
    hass_read_only_user: MockUser,
) -> None:
    """Test templates are reloaded when labs flag changes."""
    ws_client = await hass_ws_client(hass)

    assert await async_setup_component(
        hass,
        DOMAIN,
        {
            DOMAIN: {
                "triggers": {
                    "trigger": "event",
                    "event_type": "test_event",
                },
                "sensor": {
                    "name": "hello",
                    "state": "{{ trigger.event.data.stuff }}",
                },
            }
        },
    )
    assert await async_setup_component(hass, labs.DOMAIN, {})
    assert hass.states.get("sensor.hello") is not None
    assert hass.states.get("sensor.bye") is None
    listeners = hass.bus.async_listeners()
    assert listeners.get("test_event") == 1
    assert listeners.get("test_event2") is None

    context = Context()
    hass.bus.async_fire("test_event", {"stuff": "foo"}, context=context)
    await hass.async_block_till_done()
    assert hass.states.get("sensor.hello").state == "foo"

    test_reload_event = async_capture_events(hass, "event_template_reloaded")

    # Check we reload whenever the labs flag is set, even if it's already enabled
    last_state = "unknown"
    for enabled, set_state in (
        (True, "foo"),
        (True, "bar"),
        (False, "beer"),
        (False, "good"),
    ):
        test_reload_event.clear()

        with patch(
            "homeassistant.config.load_yaml_config_file",
            autospec=True,
            return_value={
                DOMAIN: {
                    "triggers": {
                        "trigger": "event",
                        "event_type": "test_event2",
                    },
                    "sensor": {
                        "name": "bye",
                        "state": "{{ trigger.event.data.stuff }}",
                    },
                }
            },
        ):
            await ws_client.send_json_auto_id(
                {
                    "type": "labs/update",
                    "domain": "automation",
                    "preview_feature": "new_triggers_conditions",
                    "enabled": enabled,
                }
            )

            msg = await ws_client.receive_json()
            assert msg["success"]
            await hass.async_block_till_done()

        assert len(test_reload_event) == 1

        assert hass.states.get("sensor.hello") is None
        assert hass.states.get("sensor.bye") is not None
        listeners = hass.bus.async_listeners()
        assert listeners.get("test_event") is None
        assert listeners.get("test_event2") == 1

        hass.bus.async_fire("test_event", {"stuff": "foo"}, context=context)
        await hass.async_block_till_done()
        assert hass.states.get("sensor.bye").state == last_state

        hass.bus.async_fire("test_event2", {"stuff": set_state}, context=context)
        await hass.async_block_till_done()
        assert hass.states.get("sensor.bye").state == set_state
        last_state = set_state


async def test_config_entry_reload_when_labs_flag_changes(
    hass: HomeAssistant,
    hass_ws_client: WebSocketGenerator,
    hass_admin_user: MockUser,
    hass_read_only_user: MockUser,
) -> None:
    """Test templates are reloaded when labs flag changes."""
    ws_client = await hass_ws_client(hass)

    template_config_entry = MockConfigEntry(
        data={},
        domain=DOMAIN,
        options={
            "name": "hello",
            "template_type": "sensor",
            "state": "{{ 'foo' }}",
        },
        title="My template",
    )
    template_config_entry.add_to_hass(hass)
    assert await hass.config_entries.async_setup(template_config_entry.entry_id)
    await hass.async_block_till_done()
    assert await async_setup_component(hass, labs.DOMAIN, {})

    assert hass.states.get("sensor.hello") is not None
    assert hass.states.get("sensor.hello").state == "foo"

    # Check we reload whenever the labs flag is set, even if it's already enabled
    for enabled, set_state in (
        (True, "beer"),
        (True, "is"),
        (False, "very"),
        (False, "good"),
    ):
        hass.config_entries.async_update_entry(
            template_config_entry,
            options={
                "name": "hello",
                "template_type": "sensor",
                "state": f"{{{{ '{set_state}' }}}}",
            },
        )
        with patch(
            "homeassistant.config.load_yaml_config_file",
            autospec=True,
            return_value={},
        ):
            await ws_client.send_json_auto_id(
                {
                    "type": "labs/update",
                    "domain": "automation",
                    "preview_feature": "new_triggers_conditions",
                    "enabled": enabled,
                }
            )

            msg = await ws_client.receive_json()
            assert msg["success"]
            await hass.async_block_till_done()

        assert hass.states.get("sensor.hello") is not None
        assert hass.states.get("sensor.hello").state == set_state


async def test_migration_1_1(
    hass: HomeAssistant,
    device_registry: dr.DeviceRegistry,
    entity_registry: er.EntityRegistry,
) -> None:
    """Test migration from v1.1 removes template config entry from device."""

    device_config_entry = MockConfigEntry()
    device_config_entry.add_to_hass(hass)
    device_entry = device_registry.async_get_or_create(
        config_entry_id=device_config_entry.entry_id,
        identifiers={("test", "identifier_test")},
        connections={("mac", "30:31:32:33:34:35")},
    )

    template_config_entry = MockConfigEntry(
        data={},
        domain=DOMAIN,
        options={
            "name": "My template",
            "template_type": "sensor",
            "state": "{{ 'foo' }}",
            "device_id": device_entry.id,
        },
        title="My template",
        version=1,
        minor_version=1,
    )
    template_config_entry.add_to_hass(hass)

    # Add the helper config entry to the device
    device_registry.async_update_device(
        device_entry.id, add_config_entry_id=template_config_entry.entry_id
    )

    # Check preconditions
    device_entry = device_registry.async_get(device_entry.id)
    assert template_config_entry.entry_id in device_entry.config_entries

    await hass.config_entries.async_setup(template_config_entry.entry_id)
    await hass.async_block_till_done()

    assert template_config_entry.state is ConfigEntryState.LOADED

    # Check that the helper config entry is removed from the device and the helper
    # entity is linked to the source device
    device_entry = device_registry.async_get(device_entry.id)
    assert template_config_entry.entry_id not in device_entry.config_entries
    template_entity_entry = entity_registry.async_get("sensor.my_template")
    assert template_entity_entry.device_id == device_entry.id

    assert template_config_entry.version == 1
    assert template_config_entry.minor_version == 2


async def test_migration_from_future_version(
    hass: HomeAssistant,
) -> None:
    """Test migration from future version."""
    config_entry = MockConfigEntry(
        data={},
        domain=DOMAIN,
        options={
            "name": "hello",
            "template_type": "sensor",
            "state": "{{ 'foo' }}",
        },
        title="My template",
        version=2,
        minor_version=1,
    )
    config_entry.add_to_hass(hass)
    await hass.config_entries.async_setup(config_entry.entry_id)
    await hass.async_block_till_done()
    assert config_entry.state is ConfigEntryState.MIGRATION_ERROR
